CSV

本文为您介绍CSV格式的使用方法和类型映射。

背景信息

CSV格式允许基于CSV结构读写CSV数据。当前,CSV结构是基于表结构推导而来的。支持CSV格式的连接器包括:消息队列KafkaUpsert Kafka消息队列RocketMQStarRocks对象存储OSS

使用示例

利用Kafka以及CSV格式构建表的示例如下。

CREATE TABLE user_behavior (
  user_id BIGINT,
  item_id BIGINT,
  category_id BIGINT,
  behavior STRING,
  ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'csv',
 'csv.ignore-parse-errors' = 'true',
 'csv.allow-comments' = 'true'
);

配置选项

选项

是否必选

默认值

类型

说明

format

String

声明使用的格式。使用CSV格式时,参数取值为csv。

csv.field-delimiter

,

String

指定字段分隔符,仅可使用单字符,默认为逗号(,)。

参数取值类型如下:

  • 反斜杠字符:用于指定特殊字符。例如,\t代表制表符。

  • unicode编码:在纯SQL文本中指定特殊字符。例如,'csv.field-delimiter' = U&'\0001'代表0x01字符。

csv.disable-quote-character

false

Boolean

是否允许引用值是否引号,参数取值如下:

  • true:禁止对引用的值使用引号。当禁止使用时,选项csv.quote-character不能设置。

  • false(默认值):允许对引用的值使用引号。

csv.quote-character

"

String

指定引用值的引号字符,默认为双引号(")。

csv.allow-comments

false

Boolean

是否允许忽略注释行,参数取值如下:

  • true:允许忽略注释行,注释行以#作为起始字符。同时请确保开启csv.ignore-parse-errors,以允许空行。

  • false(默认值):禁止忽略注释行。

csv.ignore-parse-errors

false

Boolean

当解析异常时,是否允许跳过当前字段,参数取值如下:

  • true:当解析异常时,跳过当前字段,将该字段值设置为null。

  • false(默认值):当解析异常时,抛出错误,作业启动失败。

csv.array-element-delimiter

;

String

指定分隔数组和行元素的字符串,默认为分号(;)。

csv.escape-character

String

指定转义字符,默认禁用。

csv.null-literal

String

指定识别成 null 值的字符串,默认禁用。在输入端会将该字符串转为null值,在输出端会将null值转成该字符串。

csv.write-bigdecimal-in-scientific-notation

true

Boolean

是否将Bigdecimal类型的数据表示为科学计数法,参数取值如下:

  • true(默认值):Bigdecimal类型的数据表示为科学计数法。例:100000表示为1E+5。

  • false:Bigdecimal类型的数据保持原状。例:100000仍表示为100000。

类型映射

Flink中,CSV的格式数据使用jackson databind API解析CSV字符串。FlinkCSV的数据类型的映射关系如下。

Flink SQL类型

CSV类型

CHAR / VARCHAR / STRING

string

BOOLEAN

boolean

BINARY / VARBINARY

string with encoding: base64

DECIMAL

number

TINYINT

number

SMALLINT

number

INT

number

BIGINT

number

FLOAT

number

DOUBLE

number

DATE

string with format: date

TIME

string with format: time

TIMESTAMP

string with format: date-time

INTERVAL

number

ARRAY

array

ROW

object

其他使用说明

对于写入对象存储OSS,目前暂不支持写入CSV格式的文件,具体原因请参见FLINK-30635